# Producer 源码分析

作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)


# 1 消息发送整体流程

下面是一个生产者发送消息的demo(同步发送)

public class SyncProducer {
    public static void main(String[] args) throws Exception{
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("group_test");
        // 设置NameServer的地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //producer.setSendLatencyFaultEnable(true);
        // 启动Producer实例
        producer.start();


        for (int i = 0; i < 100; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

主要做了几件事:

  • 初始化一个生产者(DefaultMQProducer)对象
  • 设置 NameServer 的地址
  • 启动生产者
  • 发送消息

# 2 消息发送者启动流程

RocketMQ 的 Producer 启动流程由 producer.start() 触发,最终通过 DefaultMQProducerImpl 完成客户端初始化、网络组件启动、路由定时刷新等行为。下图展示核心流程概览:

image.png

# 2.1 Producer.start() 入口

DefaultMQProducer.start() 本质调用的是内部实现类:

@Override
    public void start() throws MQClientException {
        this.setProducerGroup(withNamespace(this.producerGroup));
        //todo Producer的启动流程核心是defaultMQProducerImpl.start();
        this.defaultMQProducerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("acl dispatcher start failed ", e);
            }
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13

# 2.2 生产者核心启动流程

DefaultMQProducerImpl.start(), 生产者启动, 截取关键流程代码

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            // 1. 基础配置校验(ProducerGroup 是否合规)
            this.checkConfig();

            // 2. instanceName 置为进程 PID(避免冲突)
            if (!this.defaultMQProducer.getProducerGroup()
                    .equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }

            // 3. 获取/创建 MQClientInstance(JVM 全局唯一)
            this.mQClientFactory =
                    MQClientManager.getInstance()
                       .getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

            // 4. 注册 Producer 到 MQClientInstance(Producer → MQClientInstance 映射)
            boolean registerOK =
                mQClientFactory.registerProducer(
                       this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                throw new MQClientException("producer group duplicate", null);
            }

            // 5. 创建默认 Topic 路由缓存
            this.topicPublishInfoTable.put(
                    this.defaultMQProducer.getCreateTopicKey(),
                    new TopicPublishInfo());

            // 6. 启动 MQClientInstance(真正启动网络通信、定时任务、拉取线程)
            if (startFactory) {
                mQClientFactory.start();
            }

            this.serviceState = ServiceState.RUNNING;
            break;

        default:
            throw new MQClientException("Producer already started or illegal state", null);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

核心总结: Producer 本身只是“轻量客户端”,真正的网络通信、路由管理、心跳发送等全部委托给 MQClientInstance(JVM 内唯一)

# 2.3 关键步骤解析

# 1. 配置校验:checkConfig()

private void checkConfig() throws MQClientException {
    Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
    //非空判断
    if (null == this.defaultMQProducer.getProducerGroup()) {
        throw new MQClientException("producerGroup is null", null);
    }
    // 禁止使用系统默认组名
    if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
        throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
            null);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

# 2. 获取 MQClientInstance

一个 JVM 中所有 Producer / Consumer 共享一个 MQClientInstance(同 clientId 才会复用)

ConcurrentMap<String, MQClientInstance> factoryTable;
1

获取流程:

public MQClientInstance getOrCreateMQClientInstance(ClientConfig clientConfig, RPCHook rpcHook) {
    String clientId = clientConfig.buildMQClientId(); // IP@instanceName@unitName

    MQClientInstance instance = factoryTable.get(clientId);
    if (instance == null) {
        instance = new MQClientInstance(...);
        MQClientInstance prev = factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) instance = prev;
    }
    return instance;
}
1
2
3
4
5
6
7
8
9
10
11

clientId 规则:

clientIP @ instanceName @ unitName
1

➡ 若三个字段一致,则 Producer / Consumer 将共享同一个 MQClientInstance,要特别注意 groupName 冲突问题!

RocketMQ中消息发送者、消息消费者都属于”客户端“

每一个客户端就是一个MQClientInstance,每一个ClientConfig对应一个实例。

故不同的生产者、消费端,如果引用同一个客户端配置(ClientConfig),则它们共享一个MQClientInstance实例。所以我们在定义的的时候要注意这种问题(生产者和消费者如果分组名相同容易导致这个问题)

image.png

# 2.4 客户端实际启动流程

Producer 的网络组件、定时任务、消费者线程等,全部由 MQClientInstance 启动:

public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;

                // 1. 获取 NameServer 地址
                if (clientConfig.getNamesrvAddr() == null) {
                    mQClientAPIImpl.fetchNameServerAddr();
                }

                // 2. 启动 RPC 通道(底层 Netty)
                mQClientAPIImpl.start();

                // 3. 启动各种定时任务(更新路由、发送心跳等)
                startScheduledTask();

                // 4. 消费者专用线程:拉消息
                pullMessageService.start();

                // 5. 消费者专用线程:负载均衡
                rebalanceService.start();

                // 6. 最终反向调用 Producer.start(false)
                defaultMQProducer.getDefaultMQProducerImpl().start(false);

                this.serviceState = ServiceState.RUNNING;
                break;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 2.5 定时任务

Producer 与 Broker 的关系 不是实时同步,而是依赖定时任务维持, 具体可查看源码startScheduledTask()方法。

关键任务如下:

  1. 每 2 分钟:更新 NameServer 地址

    fetchNameServerAddr()
    
    1
  2. 每 30 秒:更新 Topic 路由信息

    updateTopicRouteInfoFromNameServer()
    
    1
  3. 每 30 秒:向 Broker 发送心跳

    维护 Producer/Consumer 与 Broker 的连接。

# 3 消息发送队列选择

Producer 发送消息前,必须从 Topic 的多个 MessageQueue 中选择一个最终队列。RocketMQ 提供两套策略:

  • 默认轮询策略(Round-Robin)
  • Broker 故障延迟规避策略(Latency Fault Tolerance)

选择策略的流程如下:

image.png

两者的核心入口都来自:

DefaultMQProducerImpl.sendDefaultImpl()selectOneMessageQueue()
1
2

# 3.1 默认选择队列策略

默认情况下,RocketMQ 使用 最简单可靠的轮询算法

int index = tpInfo.getSendWhichQueue().getAndIncrement();
MessageQueue mq = tpInfo.getMessageQueueList().get(index % queueSize);
1
2

特点:

  • 均匀分布:消息尽可能平均落入每个队列
  • 无状态简单:不依赖历史,只根据当前线程的 index
  • 适用于网络稳定、Broker基本不可用的情况

注意: 如果发送失败进入重试(默认重试 2 次 → 共 3 次发送尝试),轮询也会跳过失败队列。

# 3.2 故障延迟机制策略

在网络不稳定、Broker 坏节点较容易出现的场景中,可开启:

sendLatencyFaultEnable = true
1

核心思想:

根据发送耗时/失败情况,为每个 Broker 计算一个“不可用时长”,下次选择队列时跳过这些 Broker。

流程:

  1. 每次发送消息后记录延迟
  2. 调用 updateFaultItem() 更新 Broker 的延迟与不可用时间窗口
  3. 下次选队列时,优先跳过不可用的 Broker
  4. 若所有 Broker 都处于不可用窗口,则选择一个“最不差的 Broker”兜底

# 3.2.1 Producer 发送流程中的队列选择关键部分

核心代码片段(简化):

MessageQueue mqSelected =
    this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
1
2

selectOneMessageQueue() 中:

若打开延迟规避:

if (sendLatencyFaultEnable) {
    int index = tpInfo.getSendWhichQueue().getAndIncrement();
    
    for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
        int pos = index % tpInfo.getMessageQueueList().size();
        MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
        
        // 只选择可用 Broker
        if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
            return mq;

        index++;
    }

    // 所有 Broker 都不可用,兜底选一个“最不差的”
    String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
    int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
    return new MessageQueue(topic, notBestBroker, randomIndex % writeQueueNums);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 3.2.2 延迟容错计算逻辑

每次发送成功/失败后会调用:

updateFaultItem(brokerName, latency, isolation)
1

其中 latency 为发送耗时,失败时按 30s 兜底。

计算“不可用时间窗口”:

发送耗时 ≥ 不可用时长
50ms 0
100ms 0
550ms 30s
1000ms 60s
2000ms 120s
3000ms 180s
15000ms 600s

源码:

private long computeNotAvailableDuration(long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        if (currentLatency >= latencyMax[i])
            return notAvailableDuration[i];
    }
    return 0;
}
1
2
3
4
5
6
7

整体效果:

  • 发送延迟大表示 Broker 负载高 / 网络有问题 → 临时回避
  • 避免连续向“坏节点”发送
  • 提高整体发送成功率

# 3.3 两种策略的使用建议

场景 推荐策略
内网环境、网络稳定、Broker 基本正常 默认轮询(简单高效)
跨机房、网络抖动、Broker 时常不可用 延迟容错策略(高可用)

延迟容错策略优点:

  • 自动跳过不可用 Broker
  • 显著提升发送成功率
  • 避免重试时连续命中坏节点

注意前提: Topic 必须创建在 多个 Broker 上,否则规避没有意义。

# 3.4 技术亮点:ThreadLocal

public class ThreadLocalIndex {
    private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
    private final Random random = new Random();
1
2
3
public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    //基于ThreadLocal,多线程安全
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
1
2
3
4
5
6
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        //todo 默认不走这里:Broker故障延迟机制
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                //对消息队列轮询获取一个队列
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    //基于index和队列数量取余,确定位置
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
1
2
3
4
5
6
7
8
9

# 4 客户端建立连接的时机

RocketMQ 的 Producer / Consumer 与 Broker 并不是在 start() 时立即建立连接,而是遵循 按需建立(lazy connect)+ 长连接复用 的设计。

下面通过源码和时序说明详细拆解。

# 4.1 Producer 什么时候与 Broker 建立连接?

调用顺序如下:

producer.start();
SendResult result = producer.send(msg);
1
2

问题是:

producer.start() 时会连接 Broker 吗? ✘ 不会直接创建与 Broker 的连接

✔ 发送时会连接 Broker 吗? ✔ 是的:第一次发送消息或第一次访问 Broker 时才会建连

RocketMQ 采用 按需创建连接(lazy creating) 的策略。

# 4.2 源码证明

Producer 的发送链路:

send()sendDefaultImpl()sendKernelImpl()  
→ doRequest  
→ invokeSync()getAndCreateChannel()
1
2
3
4
5
6

关键代码:

public RemotingCommand invokeSync(String addr, RemotingCommand request, long timeout)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {

    // ★ 关键点:在这里创建与 Broker 的物理连接(TCP + Netty Channel)
    final Channel channel = this.getAndCreateChannel(addr);
    return this.invokeSyncImpl(channel, request, timeout);
}
1
2
3
4
5
6
7

关键结论:

连接的建立发生在第一次需要与 Broker 通信时(例如发送消息)。

# 4.3 按需建连的核心逻辑

MQClientAPIImpl.getAndCreateChannel() 的逻辑如下:

  1. 查看内部缓存(一个 Broker 地址对应一个 Channel)
  2. 若已存在且仍可用 → 直接复用
  3. 若不存在 → 创建新的 Netty Channel
  4. 建立 TCP 长连接后放入缓存

伪代码结构:

ChannelWrapper cw = channelTables.get(addr);
if (cw != null && cw.isOK()) {
    return cw.getChannel();
}

// ★ 这里真正建立连接(Netty Bootstrap.connect)
Channel ch = this.createNewChannel(addr);

// 放入缓存
channelTables.put(addr, new ChannelWrapper(ch));

return ch;
1
2
3
4
5
6
7
8
9
10
11
12

这段逻辑直接说明:

客户端在真正需要访问某个 Broker 的时候才创建连接,比如第一次发送消息。

主要由于以下设计目的:

  1. 减少无效连接(按需建立更合理)

    Producer 未必一启动就会发送消息,也未必发送到所有 Broker。

  2. 提高大规模集群网络利用率

    假如一个 Topic 有 20 个队列、分布在多个 Broker 上→ Producer 没必要一次性与所有 Broker 建连

  3. MQClientInstance 已经在 start() 中启动心跳、路由刷新等任务但并不会发起实际业务通信,因此也不会主动触发建连。